全托管Flink DataStream作业

Hologres与Flink全托管高度兼容,多数情况下您可以使用Flink SQL的方式,声明Hologres的源表、维表及结果表,进而使用SQL表达数据的处理逻辑。但对于特殊业务场景,Flink SQL方式无法满足业务计算时,您需要使用DataStream的方式读写数据。本文以VVR-8.0.8-Flink-1.17版本为例,为您完整地展示如何调试和开发基于Hologres连接器的DataStream作业。

前提条件

  • 已购买Hologres实例并创建数据库。详情请参见创建数据库

  • 已安装代码开发平台,用于本地代码调试,如IntelliJ IDEA。

步骤一:下载Connector依赖

通过DataStream的方式读写Hologres数据时,您需要下载Hologres连接器连接Flink全托管。目前已发布的连接器版本请参见Hologres DataStream连接器

  1. 您需要下载如下2个依赖JAR包:

    • ververica-connector-hologres-1.17-vvr-8.0.8.jar:用于本地调试。

    • ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar:用于本地调试和线上部署。

      说明

      从VVR-6.0-Flink-1.15版本起,商业版Connector在本地调试时,需要配合相应版本的Uber JAR使用。使用方法请参见本地运行和调试包含连接器的作业

  2. 下载后,使用如下命令将ververica-connector-hologres-1.17-vvr-8.0.8.jar安装至本地Maven仓库中:

    mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar

    其中$path为您本地存放ververica-connector-hologres-1.17-vvr-8.0.8.jar的绝对路径。

步骤二:本地开发及调试

您需要在本地完成项目开发,再在Flink全托管控制台上部署并运行。以Binlog源表为例,项目代码及pom.xml文件如下:

  1. 本地代码编写:

    • DataStream API Demo代码:

      import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs;
      import com.alibaba.ververica.connectors.hologres.binlog.StartupMode;
      import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource;
      import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
      import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
      import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils;
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.api.TableSchema;
      
      import java.util.Collections;
      
      public class HologresBinlogSourceDemo {
      
          public static void main(String[] args) throws Exception {
              Configuration envConf = new Configuration();
              // 本地调试时,需要指定uber jar的绝对路径;打包上传时请注释掉
              envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf);
              // 初始化读取的表的Schema,需要和Hologres表的字段匹配,可以只定义部分字段。
              TableSchema schema = TableSchema.builder()
              .field("<id>", DataTypes.INT().notNull())
              .primaryKey("<id>")
              .build();
      
              // Hologres的相关参数。
              Configuration config = new Configuration();
              config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>");
              config.setString(HologresConfigs.USERNAME, "<yourUserName>");
              config.setString(HologresConfigs.PASSWORD, "<yourPassword>");
              config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>");
              config.setString(HologresConfigs.TABLE, "<yourTableName>");
              config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
              config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
              // 构建JDBC Options。
              JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
      
              // 构建Hologres Binlog Source。
              long startTimeMs = 0;
              HologresBinlogSource source = new HologresBinlogSource(
                  new HologresConnectionParam(config),
                  schema,
                  config,
                  jdbcOptions,
                  startTimeMs,
                  StartupMode.INITIAL,
                  "",
                  "",
                  -1,
                  Collections.emptySet());
      
              env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
              env.execute();  
          }
      }

      参数说明:

      参数

      描述

      path_to_uber_jar

      本地Uber JAR的绝对路径。对于Windows需要加相应磁盘分区,例如file:///D:/path/to/a-uber.jar

      id

      始化读取的表的Schema,需要和Hologres表的字段匹配,可以只定义部分字段。

      yourEndpoint

      Hologres实例的网络域名。您可以进入Hologres管理控制台的实例详情页,从网络信息中获取域名。

      yourUserName

      阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。

      yourPassword

      对应阿里云账号的AccessKey Secret。

      yourDatabaseName

      Hologres数据库名称。

      yourTableName

      待读取的Hologres表名称。

    • pom.xml文件:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>com.alibaba.hologres</groupId>
          <artifactId>hologres-flink-demo</artifactId>
          <version>1.0-SNAPSHOT</version>
          <packaging>jar</packaging>
      
          <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <flink.version>1.17.2</flink.version>
              <vvr.version>1.17-vvr-8.0.8</vvr.version>
              <target.java.version>1.8</target.java.version>
              <scala.binary.version>2.12</scala.binary.version>
              <maven.compiler.source>${target.java.version}</maven.compiler.source>
              <maven.compiler.target>${target.java.version}</maven.compiler.target>
              <log4j.version>1.7.21</log4j.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-common</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-runtime</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-base</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>com.alibaba.ververica</groupId>
                  <artifactId>ververica-connector-hologres</artifactId>
                  <version>${vvr.version}</version>
              </dependency>
              <!--  日志实现  log4j  依赖  -->
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
                  <version>${log4j.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
                  <version>${log4j.version}</version>
              </dependency>
          </dependencies>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>3.1.0</version>
                      <executions>
                          <execution>
                              <phase>package</phase>
      
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <createDependencyReducedPom>false</createDependencyReducedPom>
                                  <shadedArtifactAttached>true</shadedArtifactAttached>
                                  <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                                  <transformers>
                                      <transformer
                                              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>
  2. 本地调试及运行。

    • 您需要配置运行所需要的ClassLoader JAR包,即ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar。具体操作请参见步骤二:配置运行所需要的ClassLoader JAR包

    • (可选)如果提示缺少一些常见的Flink类无法执行,例如org.apache.flink.configuration.Configuration,需要在“Modify options”处勾选“Add dependencies with provided scope to classpath”。

    配置完成后,您可在本地调试运行该项目,确保本地可运行成功。

本地调试步骤详情请参见本地运行和调试包含连接器的作业

步骤三:打包运行

本地调试成功后,您可将其进行打包并与Uber JAR一同上传至Flink。

  1. 打包前,注释掉下述代码:

    envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
  2. 编译打包。

    使用Maven编译并打包应用程序及其依赖项。命令如下:

    mvn clean package -DskipTests

    打包成功后,即可在本地生成名为hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar的文件。

  3. 上传JAR包。

    在Flink控制台的资源管理页面上传打包好的程序JAR包和ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar。具体操作请参见步骤二:上传测试JAR包和数据文件image

  4. 部署JAR作业。

    在Flink控制台的作业运维页面部署JAR作业。具体操作及参数信息请参见步骤三:部署JAR作业image

  5. 启动并查看Flink计算结果。

    说明

    若更新JAR包,需要重新上传部署JAR包并启动作业。

    1. 在Flink控制台的作业运维页面,单击目标作业名称操作列中的启动

    2. 配置资源信息和基础设置。

      作业启动参数配置详情请参见作业启动

    3. 单击启动

      单击启动后,作业状态变为运行中,则代表作业运行正常。

常见问题

  • 问题1:当您在IntelliJ IDEA中运行和调试Flink作业时,如果其包含了阿里云实时计算Flink版的商业版连接器依赖,可能会遇到无法找到连接器相关类的运行错误。例如Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter

  • 问题2:提示缺少一些常见的Flink类无法执行,例如Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration

    • 问题原因:可能是缺少依赖或者没有正常加载依赖。

    • 解决方法:

      • pom.xml文件中没有引入相关依赖,大多数情况下可能是flink-connector-base,也可以搜索异常包路径,查看其属于哪个Flink依赖。

      • 可能是运行时没有加载provided依赖。需要在IntelliJ IDEA的“Modify options”处勾选“Add dependencies with provided scope to classpath”。

  • 问题3:运行中报错Incompatible magic value

    • 问题原因:

      • 原因一:可能是使用的Uber JAR与Connector版本不一致。

      • 原因二:可能是ClassLoader设置有误。

    • 解决方法:

  • 问题4:运行时抛出异常Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found

    • 问题原因:目前Uber JAR不支持Windows系统32位的Java。

    • 解决方法:请安装64位的Java,可以通过java -version命令查看Java安装信息,如果不包含64-Bit字样,表明是32位的Java。

  • 问题5:运行时抛出Caused by: java.lang.ClassFormatError

    • 问题原因:可能是由于IntelliJ IDEA配置的JDK版本问题导致。

    • 解决方法:请使用较新的JDK8或者JDK11版本。